/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#pragma once

#include <rocksdb/status.h>

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "fmt/format.h"
#include "status.h"

namespace redis {

struct StreamEntryID {
  uint64_t ms = 0;
  uint64_t seq = 0;

  StreamEntryID() = default;
  StreamEntryID(uint64_t ms, uint64_t seq) : ms(ms), seq(seq) {}

  void Clear() {
    ms = 0;
    seq = 0;
  }

  bool IsMaximum() const { return ms == UINT64_MAX - 1 && seq == UINT64_MAX; }
  bool IsMinimum() const { return ms == 0 && seq == 0; }

  bool operator<(const StreamEntryID &rhs) const {
    if (ms < rhs.ms) return true;
    if (ms == rhs.ms) return seq < rhs.seq;
    return false;
  }

  bool operator>=(const StreamEntryID &rhs) const { return !(*this < rhs); }

  bool operator>(const StreamEntryID &rhs) const { return rhs < *this; }

  bool operator<=(const StreamEntryID &rhs) const { return !(rhs < *this); }

  bool operator==(const StreamEntryID &rhs) const { return ms == rhs.ms && seq == rhs.seq; }

  std::string ToString() const { return fmt::format("{}-{}", ms, seq); }

  static StreamEntryID Minimum() { return StreamEntryID{0, 0}; }
  static StreamEntryID Maximum() { return StreamEntryID{UINT64_MAX - 1, UINT64_MAX}; }
};

class NextStreamEntryIDGenerationStrategy {
 public:
  virtual Status GenerateID(const StreamEntryID &last_id, StreamEntryID *next_id) = 0;
  virtual ~NextStreamEntryIDGenerationStrategy() = default;
};

class FullySpecifiedEntryID : public NextStreamEntryIDGenerationStrategy {
 public:
  explicit FullySpecifiedEntryID(StreamEntryID id) : id_(std::move(id)) {}
  ~FullySpecifiedEntryID() override = default;

  Status GenerateID(const StreamEntryID &last_id, StreamEntryID *next_id) override;

 private:
  StreamEntryID id_;
};

class AutoGeneratedEntryID : public NextStreamEntryIDGenerationStrategy {
 public:
  AutoGeneratedEntryID() = default;
  ~AutoGeneratedEntryID() override = default;

  Status GenerateID(const StreamEntryID &last_id, StreamEntryID *next_id) override;
};

class SpecificTimestampWithAnySequenceNumber : public NextStreamEntryIDGenerationStrategy {
 public:
  explicit SpecificTimestampWithAnySequenceNumber(uint64_t ms) : ms_(ms){};
  ~SpecificTimestampWithAnySequenceNumber() override = default;

  Status GenerateID(const StreamEntryID &last_id, StreamEntryID *next_id) override;

 private:
  uint64_t ms_;
};

class CurrentTimestampWithSpecificSequenceNumber : public NextStreamEntryIDGenerationStrategy {
 public:
  explicit CurrentTimestampWithSpecificSequenceNumber(uint64_t seq) : seq_(seq) {}
  ~CurrentTimestampWithSpecificSequenceNumber() override = default;

  Status GenerateID(const StreamEntryID &last_id, StreamEntryID *next_id) override;

 private:
  uint64_t seq_;
};

enum class StreamTrimStrategy {
  None = 0,
  MaxLen = 1,
  MinID = 2,
};

struct StreamEntry {
  std::string key;
  std::vector<std::string> values;

  StreamEntry(std::string k, std::vector<std::string> vv) : key(std::move(k)), values(std::move(vv)) {}
};

struct StreamTrimOptions {
  uint64_t max_len;
  StreamEntryID min_id;
  StreamTrimStrategy strategy = StreamTrimStrategy::None;
};

struct StreamAddOptions {
  StreamTrimOptions trim_options;
  std::unique_ptr<NextStreamEntryIDGenerationStrategy> next_id_strategy;
  bool nomkstream = false;
};

struct StreamRangeOptions {
  StreamEntryID start;
  StreamEntryID end;
  uint64_t count;
  bool with_count = false;
  bool reverse = false;
  bool exclude_start = false;
  bool exclude_end = false;
};

struct StreamLenOptions {
  StreamEntryID entry_id;
  bool with_entry_id = false;
  bool to_first = false;
};

struct StreamXGroupCreateOptions {
  bool mkstream = false;
  int64_t entries_read = -1;
  std::string last_id;
};

struct StreamClaimOptions {
  uint64_t idle_time_ms = 0;
  bool with_time = false;
  bool with_retry_count = false;
  bool force = false;
  bool just_id = false;
  bool with_last_id = false;
  uint64_t last_delivery_time_ms;
  uint64_t last_delivery_count;
  StreamEntryID last_delivered_id;
};

struct StreamAutoClaimOptions {
  uint64_t min_idle_time_ms;
  uint64_t count = 100;
  uint64_t attempts_factors = 10;
  StreamEntryID start_id;
  bool just_id = false;
  bool exclude_start = false;
};

struct StreamConsumerGroupMetadata {
  uint64_t consumer_number = 0;
  uint64_t pending_number = 0;
  StreamEntryID last_delivered_id;
  int64_t entries_read = -1;
  uint64_t lag = 0;
};

struct StreamConsumerMetadata {
  uint64_t pending_number = 0;
  uint64_t last_attempted_interaction_ms;
  uint64_t last_successful_interaction_ms;
};

enum class StreamSubkeyType {
  StreamEntry = 0,
  StreamConsumerGroupMetadata = 1,
  StreamConsumerMetadata = 2,
  StreamPelEntry = 3,
};

struct StreamPelEntry {
  uint64_t last_delivery_time_ms;
  uint64_t last_delivery_count;
  std::string consumer_name;
};

struct StreamInfo {
  uint64_t size;
  uint64_t entries_added;
  StreamEntryID last_generated_id;
  StreamEntryID max_deleted_entry_id;
  StreamEntryID recorded_first_entry_id;
  std::unique_ptr<StreamEntry> first_entry;
  std::unique_ptr<StreamEntry> last_entry;
  std::vector<StreamEntry> entries;
};

struct StreamReadResult {
  std::string name;
  std::vector<StreamEntry> entries;

  StreamReadResult(std::string name, std::vector<StreamEntry> result)
      : name(std::move(name)), entries(std::move(result)) {}
};

struct StreamClaimResult {
  std::vector<std::string> ids;
  std::vector<StreamEntry> entries;
};

struct StreamAutoClaimResult {
  std::string next_claim_id;
  std::vector<StreamEntry> entries;
  std::vector<std::string> deleted_ids;
};

struct StreamPendingOptions {
  uint64_t idle_time = 0;
  bool with_time = false;

  StreamEntryID start_id{StreamEntryID::Minimum()};
  StreamEntryID end_id{StreamEntryID::Maximum()};

  uint64_t count;
  bool with_count = false;
  bool with_consumer = false;

  std::string consumer;
  std::string stream_name;
  std::string group_name;
};

struct StreamGetPendingEntryResult {
  uint64_t pending_number;
  StreamEntryID first_entry_id;
  StreamEntryID last_entry_id;
  std::vector<std::pair<std::string, int>> consumer_infos;
};

struct StreamNACK {
  StreamEntryID id;
  StreamPelEntry pel_entry;
};

Status IncrementStreamEntryID(StreamEntryID *id);
Status ParseStreamEntryID(const std::string &input, StreamEntryID *id);
StatusOr<std::unique_ptr<NextStreamEntryIDGenerationStrategy>> ParseNextStreamEntryIDStrategy(const std::string &input);

Status ParseRangeStart(const std::string &input, StreamEntryID *id);
Status ParseRangeEnd(const std::string &input, StreamEntryID *id);

std::string EncodeStreamEntryValue(const std::vector<std::string> &args);
Status DecodeRawStreamEntryValue(const std::string &value, std::vector<std::string> *result);

}  // namespace redis
